package cn.doitedu.rtdw.rt_report;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class TestIntervalJoin {

    public static void main(String[] args) throws Exception {
        // 编程环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/ckpt");
        env.setParallelism(1);

        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

        DataStreamSource<String> s1 = env.socketTextStream("doitedu", 9991);
        DataStreamSource<String> s2 = env.socketTextStream("doitedu", 9992);


        SingleOutputStreamOperator<Left> bean1 = s1.map(new MapFunction<String, Left>() {
            @Override
            public Left map(String value) throws Exception {

                String[] split = value.split(",");
                return new Left(split[0], Long.parseLong(split[1]));
            }
        });

        SingleOutputStreamOperator<Right> bean2 = s2.map(new MapFunction<String, Right>() {
            @Override
            public Right map(String value) throws Exception {
                String[] split = value.split(",");
                return new Right(split[0], Long.parseLong(split[1]));
            }
        });


        Table table1 = tenv.fromDataStream(bean1, Schema.newBuilder()
                .column("lid", DataTypes.STRING())
                .column("ts", DataTypes.BIGINT())
                .columnByExpression("rt", "to_timestamp_ltz(ts,3)")
                .watermark("rt", "rt")
                .build());

        Table table2 = tenv.fromDataStream(bean2, Schema.newBuilder()
                .column("rid", DataTypes.STRING())
                .column("ts", DataTypes.BIGINT())
                .columnByExpression("rt", "to_timestamp_ltz(ts,3)")
                .watermark("rt", "rt")
                .build());


        tenv.createTemporaryView("t1", table1);
        tenv.createTemporaryView("t2", table2);

        tenv.executeSql("create temporary view xxx as select t1.*,t2.rid from t1,t2 where " +
                "t1.rt between t2.rt- interval '1' second and t2.rt + interval '1' second " +
                "and t1.lid = t2.rid");

        Table xxx = tenv.from("xxx");
        tenv.toDataStream(xxx).print();

        env.execute();

    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    @Setter
    public static class Left {
        String lid;
        long ts;
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    @Setter
    public static class Right {
        String rid;
        long ts;
    }

}
